package gredis

import (
	"context"
	"fmt"
	"github.com/go-redis/redis"
	"math"
	"os"
	"regexp"
	"strconv"
	"strings"
	"time"
)

var (
	errReplacePortPattern = regexp.MustCompile(`(:\d{1,5}->)`)
	errReplacePortData    = ":localport->"
)

// 单个cache连接
type CACHEConn struct {
	client        *redis.Client
	clusterClient *redis.ClusterClient
}

func newRedisClient(groupConf *CACHEGroupConf, subGroupConf *CACHESubGroupConf, connConf *CACHEConnConf) (client *redis.Client) {
	DSN := fmt.Sprintf("%s:%d", connConf.Host, connConf.Port)

	client = redis.NewClient(&redis.Options{
		Addr:     DSN,
		Password: groupConf.Password, //传递密码

		DialTimeout:  subGroupConf.DialTimeout * time.Millisecond,  //闲置重新建立连接数时间 默认5s
		ReadTimeout:  subGroupConf.ReadTimeout * time.Millisecond,  //设置读超时时间,默认3s
		WriteTimeout: subGroupConf.WriteTimeout * time.Millisecond, //设置写超时时间，默认同slavetimeout

		PoolSize:     connConf.PoolSize,
		MinIdleConns: connConf.MinIdleConns,
		PoolTimeout:  subGroupConf.PoolTimeout * time.Millisecond,
		IdleTimeout:  subGroupConf.IdleTimeout * time.Millisecond,
	})
	return
}

func newRedisClusterClient(groupConf *CACHEGroupConf, subGroupConf *CACHESubGroupConf, connConf *CACHEConnConf) (client *redis.ClusterClient) {
	DSNList := []string{}
	for i := 0; i < len(groupConf.Cluster.Instances); i++ {
		DSN := fmt.Sprintf("%s:%d", groupConf.Cluster.Instances[i].Host, groupConf.Cluster.Instances[i].Port)
		DSNList = append(DSNList, DSN)
	}
	client = redis.NewClusterClient(&redis.ClusterOptions{
		Addrs:        DSNList,
		Password:     groupConf.Password,                           //传递密码
		DialTimeout:  subGroupConf.DialTimeout * time.Millisecond,  //闲置重新建立连接数时间 默认5s
		ReadTimeout:  subGroupConf.ReadTimeout * time.Millisecond,  //设置读超时时间,默认3s
		WriteTimeout: subGroupConf.WriteTimeout * time.Millisecond, //设置写超时时间，默认同slavetimeout

		PoolSize:     subGroupConf.PoolSize,
		MinIdleConns: subGroupConf.MinIdleConns,
		PoolTimeout:  subGroupConf.PoolTimeout * time.Millisecond,
		IdleTimeout:  subGroupConf.IdleTimeout * time.Millisecond,
	})
	return
}

func newCACHEConnection(groupConf *CACHEGroupConf, subGroupConf *CACHESubGroupConf, connConf *CACHEConnConf, hostCountMap map[string]int) (cacheConn *CACHEConn, err error) {
	initOptions(groupConf, subGroupConf, connConf, hostCountMap)

	var client *redis.Client
	var clusterClient *redis.ClusterClient

	if connConf != nil {
		client = newRedisClient(groupConf, subGroupConf, connConf)
	} else {
		clusterClient = newRedisClusterClient(groupConf, subGroupConf, connConf)
	}

	cacheConn = &CACHEConn{
		client:        client,
		clusterClient: clusterClient,
	}
	return
}

// 设置初始化值
func initOptions(groupConf *CACHEGroupConf, subGroupConf *CACHESubGroupConf, connConf *CACHEConnConf, hostCountMap map[string]int) {
	if subGroupConf.PoolSize <= 0 {
		subGroupConf.PoolSize = DEFAULT_POOL_SIZE
	}
	if subGroupConf.MinIdleConns <= 0 {
		subGroupConf.MinIdleConns = DEFAULT_MIN_IDLE_CONN
	}
	if subGroupConf.DefaultSmartMaxConn <= 0 {
		subGroupConf.DefaultSmartMaxConn = DEFAULT_SMART_MAX_CONN
	}
	if subGroupConf.DefaultSmartMinIdleConn <= 0 {
		subGroupConf.DefaultSmartMinIdleConn = DEFAULT_SMART_MIN_IDLE_CONN
	}
	if subGroupConf.DefaultPerCpuMaxConn <= 0 {
		subGroupConf.DefaultPerCpuMaxConn = DEFAULT_PER_CPU_MAX_CONN
	}
	if subGroupConf.DefaultPerCpuMinIdleConn <= 0 {
		subGroupConf.DefaultPerCpuMinIdleConn = DEFAULT_PER_CPU_MIN_IDLE_CONN
	}

	// redis-cluster模式，提前返回
	if connConf == nil {
		return
	}

	// 智能模式
	if subGroupConf.ConnMode == 1 {
		// 智能模式，默认每cpu最大连接数，最小闲置连接数
		perCpuMaxConn := subGroupConf.DefaultPerCpuMaxConn
		perCpuMinIdleConn := subGroupConf.DefaultPerCpuMinIdleConn
		// cpu核数，存在小数情况， 需要取整， 从环境变量获取
		cpuNumStr := os.Getenv("GE_CPU_PER_POD")
		maxprocsFloat, _ := strconv.ParseFloat(cpuNumStr, 64)
		maxprocsFloat = math.Ceil(maxprocsFloat)
		maxprocs := int(maxprocsFloat)
		if maxprocs <= 0 {
			maxprocs = 1
		}
		// pod默认核数
		podNumStr := os.Getenv("GE_POD_NUM")
		podNum, _ := strconv.Atoi(podNumStr)
		if podNum <= 0 {
			podNum = 1
		}
		// 每个pod的cpu数 * pod总数 * 每个cpu分配的最大连接数  大于 总最大连接数， 则需要修改 每个cpu分配的最大连接数
		if maxprocs*podNum*perCpuMaxConn > subGroupConf.DefaultSmartMaxConn {
			perCpuMaxConn = subGroupConf.DefaultSmartMaxConn / podNum / maxprocs
			if perCpuMaxConn == 0 {
				perCpuMaxConn = 1
			}
		}
		// 每个pod的cpu数 * pod总数 * 每个cpu分配的最小闲置连接数 大于 总闲置连接数， 则需要修改每个cpu分配的最小闲置连接数
		if maxprocs*podNum*perCpuMinIdleConn > subGroupConf.DefaultSmartMinIdleConn {
			perCpuMinIdleConn = subGroupConf.DefaultSmartMinIdleConn / podNum / maxprocs
			if perCpuMinIdleConn == 0 {
				perCpuMinIdleConn = 1
			}
		}
		// toml中多个从库配置，实际指向同一个IP地址, 存在1个host被使用多次
		ipUsageNum := hostCountMap[connConf.Host]
		if ipUsageNum <= 0 {
			ipUsageNum = 1
		}
		// 每个从库实例的最大连接数
		maxConn := perCpuMaxConn * maxprocs / ipUsageNum
		// 每个从库实例的 最小闲置连接数
		idleConn := perCpuMinIdleConn * maxprocs / ipUsageNum
		if maxConn == 0 {
			maxConn = 1
		}
		if idleConn == 0 {
			idleConn = 1
		}
		connConf.PoolSize = maxConn
		connConf.MinIdleConns = idleConn
	} else {
		// 默认
		connConf.PoolSize = subGroupConf.PoolSize
		connConf.MinIdleConns = subGroupConf.MinIdleConns
	}
}

func (cacheConn *CACHEConn) NewWrapperClient(ctx context.Context) (currentClient *redis.Client, err error) {
	currentClient = cacheConn.client.WithContext(ctx)

	currentClient.WrapProcess(cacheConn.InjectCtx(currentClient))
	currentClient.WrapProcessPipeline(cacheConn.InjectPipelineCtx(currentClient))
	return
}

func (cacheConn *CACHEConn) NewWrapperClusterClient(ctx context.Context) (currentClient *redis.ClusterClient, err error) {
	currentClient = cacheConn.clusterClient.WithContext(ctx)

	currentClient.WrapProcess(cacheConn.InjectClusterCtx(currentClient))
	currentClient.WrapProcessPipeline(cacheConn.InjectClusterPipelineCtx(currentClient))
	return
}

// 格式化redis命令用于cat展示
func formatCmds(cmders []redis.Cmder) string {
	var cmdList = make([]string, 0)

	// 对于每一个redis请求
	for _, cmd := range cmders {
		// 拼接命令参数
		var args = make([]string, 0)
		for index, arg := range cmd.Args() {
			// 只上报排查问题必须的前两个参数
			if index <= 1 {
				args = append(args, fmt.Sprint(arg))
			}
		}

		argsStr := strings.Join(args, " ")
		if err := cmd.Err(); err != nil {
			argsStr = fmt.Sprintf("%s(%s)", argsStr, err.Error())
		}
		cmdList = append(cmdList, argsStr)
	}
	return strings.Join(cmdList, "\n")
}

// 将ctx注入到go redis上下文环境
func (cacheConn *CACHEConn) InjectCtx(client *redis.Client) func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
	return func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
		return func(cmd redis.Cmder) (err error) {
			err = old(cmd)
			return err
		}
	}
}

// 将ctx注入go redis的pipeline语句中
func (cacheConn *CACHEConn) InjectPipelineCtx(client *redis.Client) func(oldProcess func([]redis.Cmder) error) func([]redis.Cmder) error {
	return func(oldProcess func([]redis.Cmder) error) func([]redis.Cmder) error {
		return func(cmders []redis.Cmder) (err error) {
			err = oldProcess(cmders)
			return err
		}
	}
}

func (cacheConn *CACHEConn) InjectClusterCtx(client *redis.ClusterClient) func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
	return func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
		return func(cmd redis.Cmder) (err error) {
			err = old(cmd)
			return err
		}
	}
}

// 将ctx注入go redis的pipeline语句中
func (cacheConn *CACHEConn) InjectClusterPipelineCtx(client *redis.ClusterClient) func(oldProcess func([]redis.Cmder) error) func([]redis.Cmder) error {
	return func(oldProcess func([]redis.Cmder) error) func([]redis.Cmder) error {
		return func(cmders []redis.Cmder) (err error) {
			err = oldProcess(cmders)
			return err
		}
	}
}
